Fork me on GitHub

【Java多线程】JUC集合 02. ConcurrentHashMap

[TOC]

1. 前言

ConcurrentHashMap是线程安全的哈希表

HashMap、HashTable、ConcurrentHashMap对比:

  • HashMap是非线程安全的哈希表
  • HashTable是线程安全的哈希表,通过synchronized来保证线程安全(synchronized方法),效率较低
  • ConcurrentHashMap的哈希表,通过“分段锁”来保证线程安全

2. JDK1.7

2.1 数据结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class ConcurrentHashMap<K, V> extends AbstractMap<K, V>
implements ConcurrentMap<K, V>, Serializable {
//默认初始容量
static final int DEFAULT_INITIAL_CAPACITY = 16;

//默认负载因子
static final float DEFAULT_LOAD_FACTOR = 0.75f;

//并行级别,segment数
static final int DEFAULT_CONCURRENCY_LEVEL = 16;

//最大容量
static final int MAXIMUM_CAPACITY = 1 << 30;

//每个segment最小容量,2^n
static final int MIN_SEGMENT_TABLE_CAPACITY = 2;

// segment最大数
static final int MAX_SEGMENTS = 1 << 16;

final Segment<K,V>[] segments;

...
}

2.2 内部类

2.2.1 Segment

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
static final class Segment<K,V> extends ReentrantLock implements Serializable {


transient volatile HashEntry<K,V>[] table;

transient int count;

transient int threshold;//超过该值扩容

final float loadFactor;

Segment(float lf, int threshold, HashEntry<K,V>[] tab) {
this.loadFactor = lf;
this.threshold = threshold;
this.table = tab;
}

2.2.2 HashEntry

1
2
3
4
5
6
7
8
9
10
11
12
13
static final class HashEntry<K,V> {
final int hash;
final K key;
volatile V value;
volatile HashEntry<K,V> next; //next属性为volatile,并发性

HashEntry(int hash, K key, V value, HashEntry<K,V> next) {
this.hash = hash;
this.key = key;
this.value = value;
this.next = next;
}
}

2.3 核心函数

2.3.1 初始化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public ConcurrentHashMap(int initialCapacity,
float loadFactor, int concurrencyLevel) {
if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();
if (concurrencyLevel > MAX_SEGMENTS)
concurrencyLevel = MAX_SEGMENTS;
// Find power-of-two sizes best matching arguments
int sshift = 0;
int ssize = 1;
//并行级别ssize, 保持为2^n
while (ssize < concurrencyLevel) {
++sshift;
ssize <<= 1;
}
//默认值sshift = 4, 移位数segmentShift = 28; 掩码segmentMask = 15
this.segmentShift = 32 - sshift;
this.segmentMask = ssize - 1;
if (initialCapacity > MAXIMUM_CAPACITY)
initialCapacity = MAXIMUM_CAPACITY;
int c = initialCapacity / ssize;
if (c * ssize < initialCapacity)
++c;
//cap是segment中HashEntry数组的长度
int cap = MIN_SEGMENT_TABLE_CAPACITY;
while (cap < c)
cap <<= 1;
// create segments and segments[0] 初始化segment[0]
Segment<K,V> s0 =
new Segment<K,V>(loadFactor, (int)(cap * loadFactor),//默认cap=2, 负载因子0.75,阈值为1.5
(HashEntry<K,V>[])new HashEntry[cap]);
Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
this.segments = ss;
}

2.3.2 put(K key, V value)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
public V put(K key, V value) {
Segment<K,V> s;
if (value == null)
throw new NullPointerException();
int hash = hash(key);
int j = (hash >>> segmentShift) & segmentMask;//找到key对应的segment
if ((s = (Segment<K,V>)UNSAFE.getObject // nonvolatile; recheck
(segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment
s = ensureSegment(j);//初始化segment[j],以segment[0]为原型,CAS更新
return s.put(key, hash, value, false);
}

final V put(K key, int hash, V value, boolean onlyIfAbsent) {
//tryLock()获取锁,成功返回true,失败返回false。
//获取锁失败的话,则通过scanAndLockForPut()获取锁,并返回要插入的"key-value"对应的HashEntry链表。
HashEntry<K,V> node = tryLock() ? null :
scanAndLockForPut(key, hash, value);
V oldValue;
try {
HashEntry<K,V>[] tab = table;//segment数组
int index = (tab.length - 1) & hash;
HashEntry<K,V> first = entryAt(tab, index);//first 是tab在该位置处的链表表头
for (HashEntry<K,V> e = first;;) {
if (e != null) {
K k;
if ((k = e.key) == key ||
(e.hash == hash && key.equals(k))) {
oldValue = e.value;
if (!onlyIfAbsent) {//直接替换
e.value = value;
++modCount;
}
break;
}
e = e.next;
}
else {
if (node != null)//在scanAndLockForPut()获取锁时,已经新建了key-value对应的HashEntry节点,则将HashEntry添加到Segment中
node.setNext(first);
else
node = new HashEntry<K,V>(hash, key, value, first);
int c = count + 1;
if (c > threshold && tab.length < MAXIMUM_CAPACITY)
rehash(node);//扩容
else
setEntryAt(tab, index, node);//将新的节点设置成原链表的表头
++modCount;
count = c;
oldValue = null;
break;
}
}
} finally {
unlock();
}
return oldValue;
}

插入流程如下:

  1. 首先找到key对应的segment,没有则进行初始化
  2. 插入key到对应segment的HashEntry数组,在插入前,会先获取segment的互斥锁,插入后再释放锁
  3. 首先根据hash获取key在HashEntry数组的位置,即链表首节点
  4. 遍历链表,若key已经存在,则根据onlyIfAbsent判断是否更新值,再返回;否则新建HashEntry节点,再插入到segment中
  5. 在插入HashEntry节点会进行扩容判断,需要扩容进行rehash(),否则setEntryAt插入hash对应链表表头
  • scanAndLockForPut获取锁如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) {
HashEntry<K,V> first = entryForHash(this, hash);
HashEntry<K,V> e = first;
HashEntry<K,V> node = null;
int retries = -1; // negative while locating node
while (!tryLock()) {
HashEntry<K,V> f; // to recheck first below
if (retries < 0) {
if (e == null) {
if (node == null) // speculatively create node
node = new HashEntry<K,V>(hash, key, value, null);
retries = 0;
}
else if (key.equals(e.key))
retries = 0;
else
e = e.next;
}
//重试次数超过MAX_SCAN_RETRIES(多核64),则进入阻塞队列等待锁
else if (++retries > MAX_SCAN_RETRIES) {
lock();
break;
}
else if ((retries & 1) == 0 &&//重试偶数次时候,如果表头改变了,则重置e,first,retries,重新走scanAndLockForPut方法
(f = entryForHash(this, hash)) != first) {
e = first = f; // re-traverse if entry changed
retries = -1;
}
}
return node;
}
  • 扩容rehash()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
private void rehash(HashEntry<K,V> node) {
HashEntry<K,V>[] oldTable = table;
int oldCapacity = oldTable.length;
int newCapacity = oldCapacity << 1;//扩容两倍
threshold = (int)(newCapacity * loadFactor);
// 创建新数组
HashEntry<K,V>[] newTable =
(HashEntry<K,V>[]) new HashEntry[newCapacity];
// 新的掩码,如从 16 扩容到 32,那么 sizeMask 为 31,对应二进制 ‘000...00011111’
int sizeMask = newCapacity - 1;

// 遍历原数组,老套路,将原数组位置 i 处的链表拆分到 新数组位置 i 和 i+oldCap 两个位置
for (int i = 0; i < oldCapacity ; i++) {
// e 是链表的第一个元素
HashEntry<K,V> e = oldTable[i];
if (e != null) {
HashEntry<K,V> next = e.next;
// 计算应该放置在新数组中的位置,
// 假设原数组长度为 16,e 在 oldTable[3] 处,那么 idx 只可能是 3 或者是 3 + 16 = 19
int idx = e.hash & sizeMask;
if (next == null) // 该位置处只有一个元素
newTable[idx] = e;
else { // Reuse consecutive sequence at same slot
// e 是链表表头
HashEntry<K,V> lastRun = e;
// idx 是当前链表的头结点 e 的新位置
int lastIdx = idx;

// 下面这个 for 循环会找到一个 lastRun 节点,这个节点之后的所有元素是将要放到一起的
for (HashEntry<K,V> last = next;
last != null;
last = last.next) {
int k = last.hash & sizeMask;
if (k != lastIdx) {
lastIdx = k;
lastRun = last;
}
}
// 将 lastRun 及其之后的所有节点组成的这个链表放到 lastIdx 这个位置
newTable[lastIdx] = lastRun;
// 下面的操作是处理 lastRun 之前的节点,
// 这些节点可能分配在另一个链表中,也可能分配到上面的那个链表中
for (HashEntry<K,V> p = e; p != lastRun; p = p.next) {
V v = p.value;
int h = p.hash;
int k = h & sizeMask;
HashEntry<K,V> n = newTable[k];
newTable[k] = new HashEntry<K,V>(h, p.key, v, n);
}
}
}
}
// 将新来的 node 放到新数组中刚刚的 两个链表之一 的 头部
int nodeIndex = node.hash & sizeMask; // add the new node
node.setNext(newTable[nodeIndex]);
newTable[nodeIndex] = node;
table = newTable;
}

2.3.3 get(Object key)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public V get(Object key) {
Segment<K,V> s; // manually integrate access methods to reduce overhead
HashEntry<K,V>[] tab;
int h = hash(key);//1. 计算hash值
long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
//2. 获取key对应的segment,segment中对应的HashEntry链表
if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&
(tab = s.table) != null) {
for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
(tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
e != null; e = e.next) {//3. 遍历链表,找到对应节点
K k;
if ((k = e.key) == key || (e.hash == h && key.equals(k)))
return e.value;
}
}
return null;
}

2.3.3 并发性

添加节点的操作 put 和删除节点的操作 remove 都要获取segment 上的独占锁,所以它们是线程安全的。

get 过程中是没有加锁的,我们需要考虑的问题是 get 的时候在同一个 segment 中发生了 put 或 remove 操作。

  1. put 操作的线程安全性。

    1. 初始化segment,使用了 CAS 来初始化 Segment 中的数组。
    2. 添加节点到链表的操作是插入到表头的,所以,如果这个时候 get 操作在链表遍历的过程已经到了中间,是不会影响的。当然,另一个并发问题就是 get 操作在 put 之后,需要保证刚刚插入表头的节点被读取,这个依赖于 setEntryAt 方法中使用的 UNSAFE.putOrderedObject。
    3. 扩容。扩容是新创建了数组,然后进行迁移数据,最后面将 newTable 设置给属性 table。所以,如果 get 操作此时也在进行,那么也没关系,如果 get 先行,那么就是在旧的 table 上做查询操作;而 put 先行,那么 put 操作的可见性保证就是 table 使用了 volatile 关键字。
  2. remove 操作的线程安全性。

    如果 remove 破坏的节点 get 操作已经过去了,那么这里不存在任何问题。

    如果 remove 先破坏了一个节点,分两种情况考虑。 1、如果此节点是头结点,那么需要将头结点的 next 设置为数组该位置的元素,table 虽然使用了 volatile 修饰,但是 volatile 并不能提供数组内部操作的可见性保证,所以源码中使用了 UNSAFE 来操作数组,请看方法 setEntryAt。2、如果要删除的节点不是头结点,它会将要删除节点的后继节点接到前驱节点中,这里的并发保证就是 next 属性是 volatile 的。

3. JDK1.8

3.1 数据结构

jdk1.8的ConcurrentHashMap不再使用Segment代理Map操作这种设计,整体结构变为HashMap结构,但是依旧保留分段锁的思想。之前版本是每个Segment都持有一把锁,1.8版本改为锁住hash桶的第一个节点 tabAt(table, i)。它可能是Node链表的头结点、保留节点ReservationNode、或者是TreeBin节点(TreeBin节点持有红黑树的根节点)

3.2 内部类

jdk1.8 的节点变为4种:Node链表节点,ReservationNode保留节点,TreeBin节点,红黑树节点TreeNode

  • Node 基本节点
1
2
3
4
5
6
7
8
9
10
11
12
13
14
//此类的子类具有负数hash值,并且不存储实际的数据,如果不使用子类直接使用这个类,那么key和val不会为null
static class Node<K,V> implements Map.Entry<K,V> {
final int hash;
final K key;
volatile V val;
volatile Node<K,V> next;

Node(int hash, K key, V val, Node<K,V> next) {
this.hash = hash;
this.key = key;
this.val = val;
this.next = next;
}
}
  • TreeNode 红黑树节点
1
2
3
4
5
6
7
8
9
10
11
12
13
static final class TreeNode<K,V> extends Node<K,V> {
TreeNode<K,V> parent; // red-black tree links
TreeNode<K,V> left;
TreeNode<K,V> right;
TreeNode<K,V> prev; // needed to unlink next upon deletion
boolean red;

TreeNode(int hash, K key, V val, Node<K,V> next,
TreeNode<K,V> parent) {
super(hash, key, val, next);
this.parent = parent;
}
}
  • TreeBin

TreeBin的hash值固定为-2,它是ConcurrentHashMap中用于代理操作TreeNode的特殊节点,持有存储实际数据的红黑树的根节点。因为红黑树进行写入操作,整个树的结构可能会有很大的变化,这个对读线程有很大的影响,所以TreeBin还要维护一个简单读写锁,这是相对HashMap,这个类新引入这种特殊节点的重要原因。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// 红黑树节点TreeNode实际上还保存有链表的指针,因此也可以用链表的方式进行遍历读取操作
// 自身维护一个简单的读写锁,不用考虑写-写竞争的情况
// 不是全部的写操作都要加写锁,只有部分的put/remove需要加写锁
// 很多方法的实现和jdk1.8的ConcurrentHashMap.TreeNode里面的方法基本一样,可以互相参考
static final class TreeBin<K,V> extends Node<K,V> {
TreeNode<K,V> root; // 红黑树结构的跟节点
volatile TreeNode<K,V> first; // 链表结构的头节点
volatile Thread waiter; // 最近的一个设置 WAITER 标识位的线程
volatile int lockState; // 整体的锁状态标识位

// values for lockState
// 二进制001,红黑树的 写锁状态
static final int WRITER = 1; // set while holding write lock
// 二进制010,红黑树的 等待获取写锁的状态
static final int WAITER = 2; // set when waiting for write lock
// 二进制100,红黑树的 读锁状态,读锁可以叠加,也就是红黑树方式可以并发读,每有一个这样的读线程,lockState都加上一个READER的值
static final int READER = 4; // increment value for setting read lock

// 重要的一点,红黑树的 读锁状态 和 写锁状态 是互斥的,但是从ConcurrentHashMap角度来说,读写操作实际上可以是不互斥的
// 红黑树的 读、写锁状态 是互斥的,指的是以红黑树方式进行的读操作和写操作(只有部分的put/remove需要加写锁)是互斥的
// 但是当有线程持有红黑树的 写锁 时,读线程不会以红黑树方式进行读取操作,而是使用简单的链表方式进行读取,此时读操作和写操作可以并发执行
// 当有线程持有红黑树的 读锁 时,写线程可能会阻塞,不过因为红黑树的查找很快,写线程阻塞的时间很短
// 另外一点,ConcurrentHashMap的put/remove/replace方法本身就会锁住TreeBin节点,这里不会出现写-写竞争的情况,因此这里的读写锁可以实现得很简单
}
  • ForwardingNode 转发节点

ForwardingNode是一种临时节点,在扩容进行中才会出现,hash值固定为-1,并且它不存储实际的数据数据。如果旧数组的一个hash桶中全部的节点都迁移到新数组中,旧数组就在这个hash桶中放置一个ForwardingNode。读操作或者迭代读时碰到ForwardingNode时,将操作转发到扩容后的新的table数组上去执行,写操作碰见它时,则尝试帮助扩容。

1
2
3
4
5
6
7
static final class ForwardingNode<K,V> extends Node<K,V> {
final Node<K,V>[] nextTable;
ForwardingNode(Node<K,V>[] tab) {
super(MOVED, null, null, null);
this.nextTable = tab;
}
}
  • ReservationNode 保留节点

hash值固定为-3,就是个占位符,不会保存实际的数据,正常情况是不会出现的,在jdk1.8新的函数式有关的两个方法computeIfAbsent和compute中才会出现,帮助完成对hash桶的加锁操作

1
2
3
4
5
6
7
8
9
10
static final class ReservationNode<K,V> extends Node<K,V> {
ReservationNode() {
super(RESERVED, null, null, null);
}

// 空节点代表这个hash桶当前为null,所以肯定找不到“相等”的节点
Node<K,V> find(int h, Object k) {
return null;
}
}

3.3 源码分析

3.3.1 初始化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public ConcurrentHashMap() {
}

public ConcurrentHashMap(int initialCapacity,
float loadFactor, int concurrencyLevel) {
if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();
if (initialCapacity < concurrencyLevel) // Use at least as many bins
initialCapacity = concurrencyLevel; // as estimated threads
long size = (long)(1.0 + (long)initialCapacity / loadFactor);
//初始化容量
int cap = (size >= (long)MAXIMUM_CAPACITY) ?
MAXIMUM_CAPACITY : tableSizeFor((int)size);
this.sizeCtl = cap;
}

3.3.2 put(K key, V value)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
public V put(K key, V value) {
return putVal(key, value, false);
}

/** Implementation for put and putIfAbsent */
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
int hash = spread(key.hashCode());
int binCount = 0;//用于记录对应链表的长度
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
if (tab == null || (n = tab.length) == 0)
tab = initTable();//初始化数组
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
//数组该位置为空,CAS插入该节点;CAS失败,表示有并发操作,则进入下一个循环
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
//数组正在扩容
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);//帮助数据迁移
else {
V oldVal = null;
synchronized (f) {//获取该位置首节点的监视器锁
if (tabAt(tab, i) == f) {
if (fh >= 0) {//说明是链表,红黑树首节点TreeBin的hash为-2
binCount = 1;//记录链表长度
for (Node<K,V> e = f;; ++binCount) {
K ek;
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
//链表末端,将新节点插入链表尾部
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
else if (f instanceof TreeBin) {//红黑树操作
Node<K,V> p;
binCount = 2;
//红黑树方法插入节点
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);//链表长度超过树化阈值8,将链表转换为红黑树
if (oldVal != null)
return oldVal;
break;
}
}
}
//CAS 式更新baseCount,并判断是否需要扩容
addCount(1L, binCount);
return null;
}
  • 初始化数组
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
while ((tab = table) == null || tab.length == 0) {
if ((sc = sizeCtl) < 0)//其他线程在初始化
Thread.yield(); // lost initialization race; just spin
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
if ((tab = table) == null || tab.length == 0) {
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = tab = nt;//table 是volatile属性
sc = n - (n >>> 2);// sc = 0.75n
}
} finally {
sizeCtl = sc;//设置阈值
}
break;
}
}
return tab;
}
  • 链表树化
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
private final void treeifyBin(Node<K,V>[] tab, int index) {
Node<K,V> b; int n, sc;
if (tab != null) {
if ((n = tab.length) < MIN_TREEIFY_CAPACITY)//小于最小树化容量64
tryPresize(n << 1);//扩容操作
else if ((b = tabAt(tab, index)) != null && b.hash >= 0) {
synchronized (b) {//首节点加锁
if (tabAt(tab, index) == b) {
TreeNode<K,V> hd = null, tl = null;
//遍历链表,建立红黑树
for (Node<K,V> e = b; e != null; e = e.next) {
TreeNode<K,V> p =
new TreeNode<K,V>(e.hash, e.key, e.val,
null, null);
if ((p.prev = tl) == null)
hd = p;//红黑树TreeBin节点
else
tl.next = p;
tl = p;
}
//将红黑树设置到数组相应位置
setTabAt(tab, index, new TreeBin<K,V>(hd));
}
}
}
}
}
  • 扩容
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
private final void tryPresize(int size) {
//c 为 1.5*size + 1,向上取最近的2^n
int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY :
tableSizeFor(size + (size >>> 1) + 1);
int sc;
while ((sc = sizeCtl) >= 0) {
Node<K,V>[] tab = table; int n;
if (tab == null || (n = tab.length) == 0) {//初始化
n = (sc > c) ? sc : c;
if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
if (table == tab) {
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = nt;
sc = n - (n >>> 2);
}
} finally {
sizeCtl = sc;
}
}
}
else if (c <= sc || n >= MAXIMUM_CAPACITY)
break;
else if (tab == table) {
int rs = resizeStamp(n);
if (sc < 0) {
Node<K,V>[] nt;
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
//nextTable不为null,CAS设置SIZECTL加1
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);//将原来的 tab 数组的元素迁移到新的 nextTable 数组中
}
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
transfer(tab, null);//外围调用,第一个线程nextTab 为null
}
}
}
  • 数据迁移transfer

此方法支持多线程执行,外围调用此方法的时候,会保证第一个发起数据迁移的线程,nextTab 参数为 null,之后再调用此方法的时候,nextTab 不会为 null。

transfer思路:原数组长度为 n,所以我们有 n 个迁移任务,让每个线程每次负责一个小任务是最简单的,每做完一个任务再检测是否有其他没做完的任务,帮助迁移就可以了,而 Doug Lea 使用了一个 stride,简单理解就是步长,每个线程每次负责迁移其中的一部分,如每次迁移 16 个小任务。所以,我们就需要一个全局的调度者来安排哪个线程执行哪几个任务,这个就是属性 transferIndex 的作用。

第一个发起数据迁移的线程会将 transferIndex 指向原数组最后的位置,然后从后往前的 stride 个任务属于第一个线程,然后将 transferIndex 指向新的位置,再往前的 stride 个任务属于第二个线程,依此类推。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
int n = tab.length, stride;
//stride步长,每个线程处理的任务,最小16
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE; // subdivide range
if (nextTab == null) { // initiating
try {
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];//翻倍扩容
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME
sizeCtl = Integer.MAX_VALUE;
return;
}
nextTable = nextTab;
transferIndex = n;//用于控制迁移的位置,初始为n
}
int nextn = nextTab.length;
//fwd hash值为MOVED=-1, 原数组i处位置完成迁移,会将i处设置为fwd,标志该位置已经完成迁移
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
boolean advance = true;//true为做完一个位置的迁移
boolean finishing = false; // to ensure sweep before committing nextTab
//从后往前,i为位置索引,bound为边界
for (int i = 0, bound = 0;;) {
Node<K,V> f; int fh;
//简单理解结局:i 指向了 transferIndex-1,bound 指向了 transferIndex-stride
while (advance) {
int nextIndex, nextBound;
if (--i >= bound || finishing)
advance = false;
else if ((nextIndex = transferIndex) <= 0) {//说明原数组的所有位置都有相应的线程去处理了
i = -1;
advance = false;
}
else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
//bound为这次迁移的边界
bound = nextBound;
i = nextIndex - 1;
advance = false;
}
}
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
if (finishing) {//所有迁移操作已经完成
nextTable = null;
table = nextTab;
sizeCtl = (n << 1) - (n >>> 1);
return;
}
//sizeCtl 在迁移前会设置为 (rs << RESIZE_STAMP_SHIFT) + 2
//每有一个线程参与迁移就会将 sizeCtl 加 1,这里使用 CAS 操作对 sizeCtl 进行减 1,代表做完了属于自己的任务
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
finishing = advance = true;//所有迁移都做完了,进入上面finishing分支
i = n; // recheck before commit
}
}
// 如果位置 i 处是空的,没有任何节点,那么放入刚刚初始化的 ForwardingNode 空节点
else if ((f = tabAt(tab, i)) == null)
advance = casTabAt(tab, i, null, fwd);
else if ((fh = f.hash) == MOVED)//ForwardingNode 空节点,该位置已经迁移过了
advance = true; // already processed
else {
synchronized (f) {//对数组该位置首节点加锁
if (tabAt(tab, i) == f) {
//ln对应i处的链表首节点,hn对应i+n处链表首节点
Node<K,V> ln, hn;
if (fh >= 0) {//链表节点
int runBit = fh & n;
Node<K,V> lastRun = f;
//找到原链表中的 lastRun,然后 lastRun 及其之后的节点是一起进行迁移的
//lastRun 之前的节点需要进行克隆,然后分到两个链表中
for (Node<K,V> p = f.next; p != null; p = p.next) {
int b = p.hash & n;
if (b != runBit) {
runBit = b;
lastRun = p;
}
}
if (runBit == 0) {
ln = lastRun;
hn = null;
}
else {
hn = lastRun;
ln = null;
}
for (Node<K,V> p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
if ((ph & n) == 0)
ln = new Node<K,V>(ph, pk, pv, ln);
else
hn = new Node<K,V>(ph, pk, pv, hn);
}
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
//将原数组该位置处设置为 fwd,代表该位置已经处理完毕
setTabAt(tab, i, fwd);
advance = true;
}
else if (f instanceof TreeBin) {//红黑树
TreeBin<K,V> t = (TreeBin<K,V>)f;
//lo对应i处的树首节点,hi对应i+n处树首节点
TreeNode<K,V> lo = null, loTail = null;
TreeNode<K,V> hi = null, hiTail = null;
//lc,hc对应该位置处节点个数
int lc = 0, hc = 0;
for (Node<K,V> e = t.first; e != null; e = e.next) {
int h = e.hash;
TreeNode<K,V> p = new TreeNode<K,V>
(h, e.key, e.val, null, null);
if ((h & n) == 0) {
if ((p.prev = loTail) == null)
lo = p;
else
loTail.next = p;//节点添加到尾部
loTail = p;
++lc;
}
else {
if ((p.prev = hiTail) == null)
hi = p;
else
hiTail.next = p;
hiTail = p;
++hc;
}
}
//节点数少于UNTREEIFY_THRESHOLD=6,将红黑树转换回链表
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
(hc != 0) ? new TreeBin<K,V>(lo) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
(lc != 0) ? new TreeBin<K,V>(hi) : t;
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
}
}
}
}
}

3.3.3 get(Object key)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
int h = spread(key.hashCode());//计算hash
if ((tab = table) != null && (n = tab.length) > 0 &&
(e = tabAt(tab, (n - 1) & h)) != null) {//找到再数组中的位置
if ((eh = e.hash) == h) {
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
return e.val;
}
else if (eh < 0)//表示红黑树或者正在扩容
return (p = e.find(h, key)) != null ? p.val : null;
while ((e = e.next) != null) {//遍历链表
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}

4. 参考

http://www.jasongj.com/java/concurrenthashmap/

https://javadoop.com/post/hashmap#toc5

https://blog.csdn.net/u011392897/article/details/60479937